feat: Phase 3f — end-to-end integration test + TempDirectory lifetime fix#6356
Open
g-talbot wants to merge 7 commits intogtt/parquet-merge-pipeline-3defrom
Open
feat: Phase 3f — end-to-end integration test + TempDirectory lifetime fix#6356g-talbot wants to merge 7 commits intogtt/parquet-merge-pipeline-3defrom
g-talbot wants to merge 7 commits intogtt/parquet-merge-pipeline-3defrom
Conversation
e220505 to
0af0572
Compare
d82d72d to
92cb5ed
Compare
1e9ff6c to
372f4eb
Compare
92cb5ed to
b6f8bcc
Compare
372f4eb to
7db8a68
Compare
b6f8bcc to
d296774
Compare
7db8a68 to
4a4679e
Compare
d296774 to
ededb89
Compare
4a4679e to
59f9690
Compare
ededb89 to
761b379
Compare
59f9690 to
f8155a6
Compare
761b379 to
9b3769e
Compare
f8155a6 to
74ebf40
Compare
9b3769e to
927dc9f
Compare
11766e6 to
2933c6e
Compare
927dc9f to
c51a84b
Compare
2933c6e to
9bbfd34
Compare
c51a84b to
804e384
Compare
…se 3f) Integration test that exercises the full merge actor chain: 1. Creates 2 real sorted Parquet files (via ParquetWriter with sorted_series, sort schema KV metadata, and window metadata) 2. Uploads to RamStorage 3. Seeds ParquetMergePipeline with split metadata (merge_factor=2) 4. Verifies the pipeline plans and executes a merge 5. Asserts publish_metrics_splits called with correct replaced_split_ids Also fixes TempDirectory lifetime bug: adds _scratch_directory_opt to ParquetSplitBatch so the merge executor's scratch directory stays alive until the uploader finishes reading the merged files. Without this, the temp directory was cleaned up between the executor handler returning and the uploader's async upload task reading the files. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…y comment Review findings: 1. ParquetWriterConfig was hardcoded to Default in the executor. If ingest uses custom compression, merge output would differ. Now threaded from ParquetMergePipelineParams through to the executor. 2. Fixed misleading comment claiming "planner will eventually re-plan" on merge failure. In reality, input splits are drained by operations() and won't be re-planned until the pipeline restarts with metastore re-seeding (not yet implemented — TODO added). 3. Added TODO for fetch_immature_parquet_splits() on pipeline respawn, matching the Tantivy MergePipeline pattern. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…riter_config Review findings addressed: 1. fetch_immature_splits(): on pipeline respawn after crash, queries the metastore for published Parquet splits so the planner can re-plan merges that were in-flight during the crash. On first spawn, uses the initial splits from the IndexingService (same as Tantivy pattern). 2. ParquetWriterConfig threaded from pipeline params to executor so merge output uses the same compression as ingest. 3. Fixed misleading "planner will eventually re-plan" comment on merge failure — honest about the limitation that failed splits wait for respawn re-seeding. 4. Added index_uid to ParquetMergePipelineParams for metastore queries. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Matches the Tantivy MergePipeline pattern: - ObservableState is now MergeStatistics (was unit type) - perform_observe() collects counters from uploader + publisher handles - Tracks generation, num_spawn_attempts, num_ongoing_merges, num_uploaded_splits, num_published_splits - previous_generations_statistics preserved across respawns Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Both Tantivy and Parquet merge scheduling used identical score logic (prefer merges that reduce more splits for less total bytes). Extracted the core arithmetic into score_merge(num_splits, total_bytes) and have both score_merge_operation() and score_parquet_merge_operation() call it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per CODE_STYLE.md: comments should convey intent, not implementation. Added explanations for num_merge_ops lineage, known_split_ids rebuild heuristic, output dir isolation, empty merge handling, scratch dir lifetime, permit Drop safety, publisher setter ordering, and feedback loop guard conditions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…line Reads parquet_indexing.sort_fields and parquet_indexing.window_duration_secs from IndexingSettings when constructing the ingest pipeline's TableConfig (was hardcoded to defaults). Adds parquet_merge_policy_from_settings() that converts the config-layer ParquetMergePolicyConfig to an Arc<dyn ParquetMergePolicy> runtime policy, paralleling merge_policy_from_settings() for Tantivy. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1396fd4 to
dd1f8fc
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Stacked on #6354 (Phase 3d+3e). End-to-end integration test and a bug fix discovered during testing.
Integration test
Exercises the full merge actor chain in-process with RamStorage and mock metastore:
ParquetWriterwithsorted_series, sort schema KV metadata, window metadata)RamStorageParquetMergePipelinewith split metadata (merge_factor=2)publish_metrics_splitscalled with correctreplaced_split_ids=["split-a", "split-b"]Bug fix: TempDirectory lifetime
Found and fixed a bug where the merge executor's scratch
TempDirectorywas dropped before the uploader's async upload task could read the merged files. Added_scratch_directory_opt: Option<TempDirectory>toParquetSplitBatchso the directory stays alive until the upload task completes.Test plan
cargo clippycleanmetricsfeature🤖 Generated with Claude Code